# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE

# metadata

"""
Load metadata (DocType) class

Example:

	meta = frappe.get_meta('User')
	if meta.has_field('first_name'):
		print("DocType" table has field "first_name")


"""

import json
import os
import typing
from datetime import datetime

import click

import frappe
from frappe import _, _lt
from frappe.model import (
	NO_VALUE_FIELDS,
	child_table_fields,
	data_fieldtypes,
	default_fields,
	optional_fields,
	table_fields,
)
from frappe.model.base_document import (
	DOCTYPE_TABLE_FIELDS,
	TABLE_DOCTYPES_FOR_DOCTYPE,
	BaseDocument,
)
from frappe.model.document import Document
from frappe.model.utils import is_single_doctype
from frappe.model.workflow import get_workflow_name
from frappe.modules import load_doctype_module
from frappe.utils import cached_property, cast, cint, cstr
from frappe.utils.caching import site_cache
from frappe.utils.data import add_to_date, get_datetime

ListOrTuple = list | tuple
SerializableTypes = str | int | float | datetime

DEFAULT_FIELD_LABELS = {
	"name": _lt("ID"),
	"creation": _lt("Created On"),
	"docstatus": _lt("Document Status"),
	"idx": _lt("Index"),
	"modified": _lt("Last Updated On"),
	"modified_by": _lt("Last Updated By"),
	"owner": _lt("Created By"),
	"_user_tags": _lt("Tags"),
	"_liked_by": _lt("Liked By"),
	"_comments": _lt("Comments"),
	"_assign": _lt("Assigned To"),
}

# When number of rows in a table exceeds this number, we disable certain features automatically.
# This is done to avoid hammering the site with unnecessary requests that are just meant for
# improving UX.
LARGE_TABLE_SIZE_THRESHOLD = 100_000
LARGE_TABLE_RECENCY_THRESHOLD = 30  # days


def get_meta(doctype: "str | DocType", cached: bool = True) -> "_Meta":
	"""Get metadata for a doctype.

	Args:
	    doctype: The doctype as a string object.
	    cached: Whether to use cached metadata (default: True).

	Returns:
	    Meta object for the given doctype.
	"""
	if (
		cached
		and isinstance(doctype, str)
		and (meta := frappe.client_cache.get_value(f"doctype_meta::{doctype}"))
	):
		return meta

	meta = Meta(doctype)

	key = f"doctype_meta::{meta.name}"
	frappe.client_cache.set_value(key, meta)
	return meta


def clear_meta_cache(doctype: str = "*"):
	key = f"doctype_meta::{doctype}"
	if doctype == "*":
		frappe.client_cache.delete_keys(key)
	else:
		frappe.client_cache.delete_value(key)


def load_meta(doctype):
	return Meta(doctype)


def get_table_columns(doctype):
	return frappe.db.get_table_columns(doctype)


def load_doctype_from_file(doctype):
	fname = frappe.scrub(doctype)
	with open(frappe.get_app_path("frappe", "core", "doctype", fname, fname + ".json")) as f:
		txt = json.loads(f.read())

	for d in txt.get("fields", []):
		d["doctype"] = "DocField"

	for d in txt.get("permissions", []):
		d["doctype"] = "DocPerm"

	txt["fields"] = [BaseDocument(d) for d in txt["fields"]]
	if "permissions" in txt:
		txt["permissions"] = [BaseDocument(d) for d in txt["permissions"]]

	return txt


class Meta(Document):
	_metaclass = True
	default_fields = list(default_fields)[1:]
	special_doctypes = frozenset(
		(
			"DocField",
			"DocPerm",
			"DocType",
			"Module Def",
			"DocType Action",
			"DocType Link",
			"DocType State",
		)
	)
	standard_set_once_fields = (
		frappe._dict(fieldname="creation", fieldtype="Datetime"),
		frappe._dict(fieldname="owner", fieldtype="Data"),
	)

	def __init__(self, doctype: "str | DocType"):
		if isinstance(doctype, Document):
			super().__init__(doctype.as_dict())
		else:
			super().__init__("DocType", doctype)

		self.process()

	def load_from_db(self):
		try:
			super().load_from_db()
		except frappe.DoesNotExistError:
			if self.doctype == "DocType" and self.name in self.special_doctypes:
				self.__dict__.update(load_doctype_from_file(self.name))
			else:
				raise

	def process(self):
		# don't process for special doctypes
		# prevents circular dependency
		if self.name in self.special_doctypes:
			if self.name == "DocPerm":
				self.add_custom_fields()
			self.init_field_caches()
			return

		self.add_custom_fields()
		self.apply_property_setters()
		self.init_field_caches()
		self.sort_fields()
		self.get_valid_columns()
		self.set_custom_permissions()
		self.add_custom_links_and_actions()
		self.check_if_large_table()

	def as_dict(self, no_nulls=False):
		return _serialize(self, no_nulls=no_nulls)

	def get_link_fields(self):
		return self.get("fields", {"fieldtype": "Link", "options": ["!=", "[Select]"]})

	def get_data_fields(self):
		return self.get("fields", {"fieldtype": "Data"})

	def get_phone_fields(self):
		return self.get("fields", {"fieldtype": "Phone"})

	def get_dynamic_link_fields(self):
		return self._dynamic_link_fields

	def get_masked_fields(self):
		import copy

		if frappe.session.user == "Administrator":
			return []
		cache_key = f"masked_fields::{self.name}::{frappe.session.user}"
		masked_fields = frappe.cache.get_value(cache_key)

		if masked_fields is None:
			masked_fields = []
			for df in self.fields:
				if df.get("mask") and not self.has_permlevel_access_to(
					fieldname=df.fieldname, df=df, permission_type="mask"
				):
					# work on a copy instead of original df
					df_copy = copy.deepcopy(df)
					df_copy.mask_readonly = 1
					masked_fields.append(df_copy)
			frappe.cache.set_value(cache_key, masked_fields)

		return masked_fields

	@cached_property
	def _dynamic_link_fields(self):
		return self.get("fields", {"fieldtype": "Dynamic Link"})

	def get_select_fields(self):
		return self.get("fields", {"fieldtype": "Select", "options": ["not in", ["[Select]", "Loading..."]]})

	def get_image_fields(self):
		return self.get("fields", {"fieldtype": "Attach Image"})

	def get_code_fields(self):
		return self.get("fields", {"fieldtype": "Code"})

	def get_set_only_once_fields(self):
		"""Return fields with `set_only_once` set"""
		return self._set_only_once_fields

	@cached_property
	def _set_only_once_fields(self):
		set_only_once_fields = self.get("fields", {"set_only_once": 1})
		fieldnames = [d.fieldname for d in set_only_once_fields]

		for df in self.standard_set_once_fields:
			if df.fieldname not in fieldnames:
				set_only_once_fields.append(df)

		return set_only_once_fields

	def get_table_fields(self, include_computed=False):
		return self._table_fields if include_computed else self._non_computed_table_fields

	def get_global_search_fields(self):
		"""Return list of fields with `in_global_search` set and `name` if set."""
		fields = self.get("fields", {"in_global_search": 1, "fieldtype": ["not in", NO_VALUE_FIELDS]})
		if getattr(self, "show_name_in_global_search", None):
			fields.append(frappe._dict(fieldtype="Data", fieldname="name", label="Name"))

		return fields

	def get_valid_columns(self) -> list[str]:
		return self._valid_columns

	@cached_property
	def _valid_columns(self):
		table_exists = frappe.db.table_exists(self.name)
		if self.name in self.special_doctypes and table_exists:
			valid_columns = get_table_columns(self.name)
		else:
			valid_columns = self.default_fields + [
				df.fieldname
				for df in self.get("fields")
				if not getattr(df, "is_virtual", False) and df.fieldtype in data_fieldtypes
			]
			if self.istable:
				valid_columns += list(child_table_fields)

		return valid_columns

	def get_valid_fields(self) -> list[str]:
		return self._valid_fields

	@cached_property
	def _valid_fields(self):
		if (frappe.flags.in_install or frappe.flags.in_migrate) and self.name in self.special_doctypes:
			valid_fields = get_table_columns(self.name)
		else:
			valid_fields = self.default_fields + [
				df.fieldname for df in self.get("fields") if df.fieldtype in data_fieldtypes
			]
			if self.istable:
				valid_fields += list(child_table_fields)

		return valid_fields

	def get_field(self, fieldname):
		"""Return docfield from meta."""

		return self._fields.get(fieldname)

	def has_field(self, fieldname):
		"""Return True if fieldname exists."""

		return fieldname in self._fields

	def get_label(self, fieldname):
		"""Return label of the given fieldname."""
		if df := self.get_field(fieldname):
			return df.get("label")

		if fieldname in DEFAULT_FIELD_LABELS:
			return str(DEFAULT_FIELD_LABELS[fieldname])

		return "No Label"

	def get_options(self, fieldname):
		return self.get_field(fieldname).options

	def get_link_doctype(self, fieldname):
		df = self.get_field(fieldname)

		if df.fieldtype == "Link":
			return df.options

		if df.fieldtype == "Dynamic Link":
			return self.get_options(df.options)

	def get_search_fields(self):
		search_fields = self.search_fields or "name"
		search_fields = [d.strip() for d in search_fields.split(",")]
		if "name" not in search_fields:
			search_fields.append("name")

		return search_fields

	def get_fields_to_fetch(self, link_fieldname=None):
		"""Return a list of docfield objects for fields whose values
		are to be fetched and updated for a particular link field.

		These fields are of type Data, Link, Text, Readonly and their
		fetch_from property is set as `link_fieldname`.`source_fieldname`"""

		out = []

		if not link_fieldname:
			link_fields = [df.fieldname for df in self.get_link_fields()]

		for df in self.fields:
			if df.fieldtype not in NO_VALUE_FIELDS and getattr(df, "fetch_from", None):
				if link_fieldname:
					if df.fetch_from.startswith(link_fieldname + "."):
						out.append(df)
				else:
					if "." in df.fetch_from:
						fieldname = df.fetch_from.split(".", 1)[0]
						if fieldname in link_fields:
							out.append(df)

		return out

	def get_list_fields(self):
		list_fields = ["name"] + [
			d.fieldname for d in self.fields if (d.in_list_view and d.fieldtype in data_fieldtypes)
		]
		if self.title_field and self.title_field not in list_fields:
			list_fields.append(self.title_field)
		return list_fields

	def get_custom_fields(self):
		return [d for d in self.fields if getattr(d, "is_custom_field", False)]

	def get_title_field(self):
		"""Return the title field of this doctype,
		explict via `title_field`, or `title` or `name`"""
		title_field = getattr(self, "title_field", None)
		if not title_field and self.has_field("title"):
			title_field = "title"
		if not title_field:
			title_field = "name"

		return title_field

	def get_translatable_fields(self):
		"""Return all fields that are translation enabled"""
		return [d.fieldname for d in self.fields if d.translatable]

	def is_translatable(self, fieldname):
		"""Return true of false given a field"""

		if field := self.get_field(fieldname):
			return field.translatable

	def get_workflow(self):
		return get_workflow_name(self.name)

	def get_naming_series_options(self) -> list[str]:
		"""Get list naming series options."""

		if field := self.get_field("naming_series"):
			options = field.options or ""
			return options.split("\n")

		return []

	def add_custom_fields(self):
		if not frappe.db.table_exists("Custom Field"):
			return

		custom_fields = frappe.db.get_values(
			"Custom Field",
			filters={"dt": self.name},
			fieldname="*",
			as_dict=True,
			order_by="idx",
			update={"is_custom_field": 1},
		)

		if not custom_fields:
			return

		self.extend("fields", custom_fields)

	def apply_property_setters(self):
		if not frappe.db.table_exists("Property Setter"):
			return

		property_setters = frappe.db.get_values(
			"Property Setter",
			filters={"doc_type": self.name},
			fieldname="*",
			as_dict=True,
		)

		if not property_setters:
			return

		for ps in property_setters:
			if ps.doctype_or_field == "DocType":
				self.set(ps.property, cast(ps.property_type, ps.value))

			elif ps.doctype_or_field == "DocField":
				for d in self.fields:
					if d.fieldname == ps.field_name:
						d.set(ps.property, cast(ps.property_type, ps.value))
						break

			elif ps.doctype_or_field == "DocType Link":
				for d in self.links:
					if d.name == ps.row_name:
						d.set(ps.property, cast(ps.property_type, ps.value))
						break

			elif ps.doctype_or_field == "DocType Action":
				for d in self.actions:
					if d.name == ps.row_name:
						d.set(ps.property, cast(ps.property_type, ps.value))
						break

			elif ps.doctype_or_field == "DocType State":
				for d in self.states:
					if d.name == ps.row_name:
						d.set(ps.property, cast(ps.property_type, ps.value))
						break

	def add_custom_links_and_actions(self):
		for doctype, fieldname in (
			("DocType Link", "links"),
			("DocType Action", "actions"),
			("DocType State", "states"),
		):
			# ignore_ddl because the `custom` column was added later via a patch
			for d in frappe.get_all(
				doctype, fields="*", filters=dict(parent=self.name, custom=1), ignore_ddl=True
			):
				self.append(fieldname, d)

			# set the fields in order if specified
			# order is saved as `links_order`
			order = json.loads(self.get(f"{fieldname}_order") or "[]")
			if order:
				name_map = {d.name: d for d in self.get(fieldname)}
				new_list = [name_map[name] for name in order if name in name_map]
				# add the missing items that have not be added
				# maybe these items were added to the standard product
				# after the customization was done
				for d in self.get(fieldname):
					if d not in new_list:
						new_list.append(d)

				self.set(fieldname, new_list)

	def check_if_large_table(self):
		"""Apply some heuristics to detect large tables.

		UI code can use this information to adapt accordingly."""
		# Note: `modified` should be used in older versions.
		self.is_large_table = False
		if self.istable or not frappe.db.table_exists(self.name):  # During install, new migrate
			return

		if frappe.db.estimate_count(self.name) > LARGE_TABLE_SIZE_THRESHOLD:
			recent_change = frappe.db.get_value(self.name, {}, "creation", order_by="creation desc")
			if get_datetime(recent_change) > add_to_date(None, days=-1 * LARGE_TABLE_RECENCY_THRESHOLD):
				self.is_large_table = True

	@cached_property
	def _fields(self):
		return {field.fieldname: field for field in self.fields}

	@cached_property
	def _table_fields(self):
		if self.name == "DocType":
			return DOCTYPE_TABLE_FIELDS

		return self.get("fields", {"fieldtype": ["in", table_fields]})

	@cached_property
	def _non_computed_table_fields(self):
		if self.name == "DocType":
			return self._table_fields

		return self.get("fields", {"fieldtype": ["in", table_fields], "is_virtual": 0})

	@cached_property
	def _table_doctypes(self):
		return {field.fieldname: field.options for field in self._table_fields}

	@cached_property
	def _non_computed_table_doctypes(self):
		return {field.fieldname: field.options for field in self._non_computed_table_fields}

	def init_field_caches(self):
		self._fields
		self._table_fields
		self._non_computed_table_fields
		self._table_doctypes
		self._non_computed_table_doctypes

	def sort_fields(self):
		"""
		Sort fields on the basis of following rules (priority descending):
		- `field_order` property setter
		- `insert_after` computed based on default order for standard fields
		- `insert_after` property for custom fields
		"""

		if field_order := getattr(self, "field_order", []):
			field_order = [fieldname for fieldname in json.loads(field_order) if fieldname in self._fields]

			# all fields match, best case scenario
			if len(field_order) == len(self.fields):
				self._update_fields_based_on_order(field_order)
				return

			# if the first few standard fields are not in the field order, prepare to prepend them
			if self.fields[0].fieldname not in field_order:
				fields_to_prepend = []
				standard_field_found = False

				for fieldname, field in self._fields.items():
					if getattr(field, "is_custom_field", False):
						# all custom fields from here on
						break

					if fieldname in field_order:
						standard_field_found = True
						break

					fields_to_prepend.append(fieldname)

				if standard_field_found:
					field_order = fields_to_prepend + field_order
				else:
					# worst case scenario, invalidate field_order
					field_order = fields_to_prepend

		existing_fields = set(field_order) if field_order else False
		insertion_map = {}

		for index, field in enumerate(self.fields):
			if existing_fields and field.fieldname in existing_fields:
				continue

			if not getattr(field, "is_custom_field", False):
				if existing_fields:
					# compute insert_after from previous field
					insertion_map.setdefault(self.fields[index - 1].fieldname, []).append(field.fieldname)
				else:
					field_order.append(field.fieldname)

			elif target_position := getattr(field, "insert_after", None):
				original_target = target_position
				if field.fieldtype in ["Section Break", "Column Break"] and target_position in field_order:
					# Find the next section or column break and set target_position to just one field before
					for current_field in field_order[field_order.index(target_position) + 1 :]:
						if self._fields[current_field].fieldtype == "Section Break" or (
							self._fields[current_field].fieldtype == self._fields[original_target].fieldtype
						):
							# Break out to add this just after the last field
							break
						target_position = current_field
				insertion_map.setdefault(target_position, []).append(field.fieldname)

			else:
				# if custom field is at the top, insert after is None
				field_order.insert(0, field.fieldname)

		if insertion_map:
			_update_field_order_based_on_insert_after(field_order, insertion_map)

		self._update_fields_based_on_order(field_order)

	def _update_fields_based_on_order(self, field_order):
		sorted_fields = []

		for idx, fieldname in enumerate(field_order, 1):
			field = self._fields[fieldname]
			field.idx = idx
			sorted_fields.append(field)

		self.fields = sorted_fields

	def set_custom_permissions(self):
		"""Reset `permissions` with Custom DocPerm if exists"""
		if frappe.flags.in_patch or frappe.flags.in_install:
			return

		if not self.istable and self.name not in ("DocType", "DocField", "DocPerm", "Custom DocPerm"):
			custom_perms = frappe.get_all(
				"Custom DocPerm",
				fields="*",
				filters=dict(parent=self.name),
				update=dict(doctype="Custom DocPerm"),
			)
			if custom_perms:
				self.permissions = [Document(d) for d in custom_perms]

	def get_fieldnames_with_value(self, with_field_meta=False, with_virtual_fields=False):
		def is_value_field(df):
			return (df.fieldtype not in NO_VALUE_FIELDS) and (
				with_virtual_fields or not getattr(df, "is_virtual", False)
			)

		if with_field_meta:
			return [df for df in self.fields if is_value_field(df)]

		return [df.fieldname for df in self.fields if is_value_field(df)]

	def get_fields_to_check_permissions(self, user_permission_doctypes):
		fields = self.get(
			"fields",
			{
				"fieldtype": "Link",
				"parent": self.name,
				"ignore_user_permissions": ("!=", 1),
				"options": ("in", user_permission_doctypes),
			},
		)

		if self.name in user_permission_doctypes:
			fields.append(frappe._dict({"label": "Name", "fieldname": "name", "options": self.name}))

		return fields

	def get_high_permlevel_fields(self):
		"""Build list of fields with high perm level and all the higher perm levels defined."""
		return self.high_permlevel_fields

	@cached_property
	def high_permlevel_fields(self):
		return [df for df in self.fields if df.permlevel > 0]

	def get_permitted_fieldnames(
		self,
		parenttype=None,
		*,
		user=None,
		permission_type="read",
		with_virtual_fields=True,
	):
		"""Build list of `fieldname` with read perm level and all the higher perm levels defined.

		Note: If permissions are not defined for DocType, return all the fields with value.
		"""
		permitted_fieldnames = []

		if self.istable and not parenttype:
			return permitted_fieldnames

		if not permission_type:
			permission_type = "select" if frappe.only_has_select_perm(self.name, user=user) else "read"

		if permission_type == "select":
			return self.get_search_fields()

		if not self.get_permissions(parenttype=parenttype):
			return self.get_fieldnames_with_value()

		permlevel_access = set(
			self.get_permlevel_access(permission_type=permission_type, parenttype=parenttype, user=user)
		)

		if 0 not in permlevel_access and permission_type in ("read", "select"):
			if frappe.share.get_shared(self.name, user, rights=[permission_type], limit=1):
				permlevel_access.add(0)

		permitted_fieldnames.extend(
			df.fieldname
			for df in self.get_fieldnames_with_value(
				with_field_meta=True, with_virtual_fields=with_virtual_fields
			)
			if df.permlevel in permlevel_access
		)
		return permitted_fieldnames

	def get_permlevel_access(self, permission_type="read", parenttype=None, *, user=None):
		has_access_to = []
		roles = set(frappe.get_roles(user))
		for perm in self.get_permissions(parenttype):
			if perm.role in roles and perm.get(permission_type):
				if perm.permlevel not in has_access_to:
					has_access_to.append(perm.permlevel)

		return has_access_to

	def get_permissions(self, parenttype=None):
		if self.istable and parenttype:
			# use parent permissions
			permissions = frappe.get_meta(parenttype).permissions
		else:
			permissions = self.get("permissions", [])

		return permissions

	def get_dashboard_data(self):
		"""Return dashboard setup related to this doctype.

		This method will return the `data` property in the `[doctype]_dashboard.py`
		file in the doctype's folder, along with any overrides or extensions
		implemented in other Frappe applications via hooks.
		"""
		data = frappe._dict()
		if not self.custom:
			try:
				module = load_doctype_module(self.name, suffix="_dashboard")
				if hasattr(module, "get_data"):
					data = frappe._dict(module.get_data())
			except ImportError:
				pass

		self.add_doctype_links(data)

		if not self.custom:
			for hook in frappe.get_hooks("override_doctype_dashboards", {}).get(self.name, []):
				data = frappe._dict(frappe.get_attr(hook)(data=data))

		return data

	def add_doctype_links(self, data):
		"""add `links` child table in standard link dashboard format"""
		dashboard_links = []

		if getattr(self, "links", None):
			dashboard_links.extend(self.links)

		if not data.transactions:
			# init groups
			data.transactions = []

		if not data.non_standard_fieldnames:
			data.non_standard_fieldnames = {}

		if not data.internal_links:
			data.internal_links = {}

		for link in dashboard_links:
			link.added = False
			if link.hidden:
				continue

			for group in data.transactions:
				group = frappe._dict(group)

				# For internal links parent doctype will be the key
				doctype = link.parent_doctype or link.link_doctype
				# group found
				if link.group and _(group.label) == _(link.group):
					if doctype not in group.get("items"):
						group.get("items").append(doctype)
					link.added = True

			if not link.added:
				# group not found, make a new group
				data.transactions.append(
					dict(label=link.group, items=[link.parent_doctype or link.link_doctype])
				)

			if not data.fieldname and link.link_fieldname:
				data.fieldname = link.link_fieldname

			if not link.is_child_table:
				data.non_standard_fieldnames[link.link_doctype] = link.link_fieldname
			elif link.is_child_table:
				data.internal_links[link.parent_doctype] = [link.table_fieldname, link.link_fieldname]

	def get_row_template(self):
		return self.get_web_template(suffix="_row")

	def get_list_template(self):
		return self.get_web_template(suffix="_list")

	def get_web_template(self, suffix=""):
		"""Return the relative path of the row template for this doctype."""
		module_name = frappe.scrub(self.module)
		doctype = frappe.scrub(self.name)
		template_path = frappe.get_module_path(
			module_name, "doctype", doctype, "templates", doctype + suffix + ".html"
		)
		if os.path.exists(template_path):
			return f"{module_name}/doctype/{doctype}/templates/{doctype}{suffix}.html"
		return None

	def is_nested_set(self):
		return self.has_field("lft") and self.has_field("rgt")


#######


def get_parent_dt(dt):
	if not frappe.is_table(dt):
		return ""

	return (
		frappe.db.get_value(
			"DocField",
			{"fieldtype": ("in", frappe.model.table_fields), "options": dt},
			"parent",
		)
		or ""
	)


def set_fieldname(field_id, fieldname):
	frappe.db.set_value("DocField", field_id, "fieldname", fieldname)


def get_field_currency(df, doc=None):
	"""get currency based on DocField options and fieldvalue in doc"""
	currency = None

	if not df.get("options"):
		return None

	if not doc:
		return None

	if not getattr(frappe.local, "field_currency", None):
		frappe.local.field_currency = frappe._dict()

	if not (
		frappe.local.field_currency.get((doc.doctype, doc.name), {}).get(df.fieldname)
		or (
			doc.get("parent")
			and frappe.local.field_currency.get((doc.doctype, doc.parent), {}).get(df.fieldname)
		)
	):
		ref_docname = doc.get("parent") or doc.name

		if ":" in cstr(df.get("options")):
			split_opts = df.get("options").split(":")
			if len(split_opts) == 3 and doc.get(split_opts[1]):
				currency = frappe.get_cached_value(split_opts[0], doc.get(split_opts[1]), split_opts[2])
		else:
			currency = doc.get(df.get("options"))
			if doc.get("parenttype"):
				if currency:
					ref_docname = doc.name
				else:
					if frappe.get_meta(doc.parenttype).has_field(df.get("options")):
						# only get_value if parent has currency field
						currency = frappe.db.get_value(doc.parenttype, doc.parent, df.get("options"))

		if currency:
			frappe.local.field_currency.setdefault((doc.doctype, ref_docname), frappe._dict()).setdefault(
				df.fieldname, currency
			)

	return frappe.local.field_currency.get((doc.doctype, doc.name), {}).get(df.fieldname) or (
		doc.get("parent") and frappe.local.field_currency.get((doc.doctype, doc.parent), {}).get(df.fieldname)
	)


def get_field_precision(df, doc=None, currency=None):
	"""get precision based on DocField options and fieldvalue in doc"""
	if df.precision:
		precision = cint(df.precision)

	elif df.fieldtype == "Currency":
		precision = cint(frappe.db.get_default("currency_precision"))
		if not precision:
			precision = get_precision_from_currency_format(currency or get_field_currency(df, doc))
	else:
		precision = cint(frappe.db.get_default("float_precision")) or 3

	return precision


def get_precision_from_currency_format(currency: str) -> int:
	"""Get precision from currency format string if applicable."""
	from frappe.locale import get_number_format
	from frappe.utils.number_format import NumberFormat

	use_format_from_currency = frappe.get_system_settings("use_number_format_from_currency")
	number_format = get_number_format()
	if use_format_from_currency:
		currency_format = frappe.db.get_value("Currency", currency, "number_format", cache=True)
		number_format = NumberFormat.from_string(currency_format) if currency_format else number_format
	return number_format.precision


def get_default_df(fieldname):
	if fieldname in (default_fields + child_table_fields):
		if fieldname in ("creation", "modified"):
			return frappe._dict(fieldname=fieldname, fieldtype="Datetime")

		elif fieldname in ("idx", "docstatus"):
			return frappe._dict(fieldname=fieldname, fieldtype="Int")

		elif fieldname in ("owner", "modified_by"):
			return frappe._dict(fieldname=fieldname, fieldtype="Link", options="User")

		return frappe._dict(fieldname=fieldname, fieldtype="Data")


def trim_tables(doctype=None, dry_run=False, quiet=False):
	"""
	Removes database fields that don't exist in the doctype (json or custom field). This may be needed
	as maintenance since removing a field in a DocType doesn't automatically
	delete the db field.
	"""
	UPDATED_TABLES = {}
	filters = {"issingle": 0, "is_virtual": 0}
	if doctype:
		filters["name"] = doctype

	for doctype in frappe.get_all("DocType", filters=filters, pluck="name"):
		try:
			dropped_columns = trim_table(doctype, dry_run=dry_run)
			if dropped_columns:
				UPDATED_TABLES[doctype] = dropped_columns
		except frappe.db.TableMissingError:
			if quiet:
				continue
			click.secho(f"Ignoring missing table for DocType: {doctype}", fg="yellow", err=True)
			click.secho(f"Consider removing record in the DocType table for {doctype}", fg="yellow", err=True)
		except Exception as e:
			if quiet:
				continue
			click.echo(e, err=True)

	return UPDATED_TABLES


def trim_table(doctype, dry_run=True):
	key = f"table_columns::tab{doctype}"
	frappe.cache.delete_value(key)
	ignore_fields = default_fields + optional_fields + child_table_fields
	columns = frappe.db.get_table_columns(doctype)
	fields = frappe.get_meta(doctype, cached=False).get_fieldnames_with_value()

	def is_internal(field):
		return field not in ignore_fields and not field.startswith("_")

	columns_to_remove = [f for f in list(set(columns) - set(fields)) if is_internal(f)]
	DROPPED_COLUMNS = columns_to_remove[:]

	if columns_to_remove and not dry_run:
		columns_to_remove = ", ".join(f"DROP `{c}`" for c in columns_to_remove)
		frappe.db.sql_ddl(f"ALTER TABLE `tab{doctype}` {columns_to_remove}")

	return DROPPED_COLUMNS


def _update_field_order_based_on_insert_after(field_order, insert_after_map):
	"""Update the field order based on insert_after_map"""

	retry_field_insertion = True

	while retry_field_insertion:
		retry_field_insertion = False

		for fieldname in list(insert_after_map):
			if fieldname not in field_order:
				continue

			custom_field_index = field_order.index(fieldname)
			for custom_field_name in insert_after_map.pop(fieldname):
				custom_field_index += 1
				field_order.insert(custom_field_index, custom_field_name)

			retry_field_insertion = True

	if insert_after_map:
		# insert_after is an invalid fieldname, add these fields to the end
		for fields in insert_after_map.values():
			field_order.extend(fields)


CACHE_PROPERTIES = frozenset(prop for prop, value in vars(Meta).items() if isinstance(value, cached_property))


def _serialize(doc, no_nulls=False, *, is_child=False):
	out = {}
	for key, value in doc.__dict__.items():
		if not is_child:
			if key in CACHE_PROPERTIES:
				continue

			if isinstance(value, ListOrTuple):
				if value and isinstance(value[0], BaseDocument):
					out[key] = [_serialize(d, no_nulls=no_nulls, is_child=True) for d in value]

				continue

		if (not no_nulls and value is None) or isinstance(value, SerializableTypes):
			out[key] = value

	if not is_child:
		# set empty lists for unset table fields
		for fieldname in TABLE_DOCTYPES_FOR_DOCTYPE:
			if out.get(fieldname) is None:
				out[fieldname] = []

	return out


if typing.TYPE_CHECKING:
	# This is DX hack to add all fields from DocType to meta for autocompletions.
	# Meta is technically doctype + special fields on meta.
	from frappe.core.doctype.doctype.doctype import DocType

	class _Meta(Meta, DocType):
		pass


# backward compatibility
is_single = is_single_doctype
